/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/*
 * $Id: we_redistributecontrol.cpp 4450 2013-01-21 14:13:24Z rdempsey $
 */

#include <iostream>
#include <set>
#include <vector>
#include <cassert>
#include <stdexcept>
#include <sstream>
#include <string>
#include <ctime>
#include <unistd.h>
//#include <sys/stat.h>
using namespace std;

#include "boost/scoped_ptr.hpp"
#include "boost/scoped_array.hpp"
#include "boost/thread.hpp"
#include "boost/thread/mutex.hpp"
#include "boost/filesystem/path.hpp"
#include "boost/filesystem/operations.hpp"
using namespace boost;

#include "installdir.h"

#include "configcpp.h"
using namespace config;

#include "liboamcpp.h"
using namespace oam;

#include "messagequeue.h"
#include "bytestream.h"
using namespace messageqcpp;

#include "logger.h"

#include "calpontsystemcatalog.h"
using namespace execplan;

#include "we_messages.h"
#include "we_redistributedef.h"
#include "we_redistributecontrolthread.h"
#include "we_redistributeworkerthread.h"
#include "we_redistributecontrol.h"

namespace redistribute
{
RedistributeControl* RedistributeControl::fInstance = NULL;
boost::mutex instanceMutex;

const string RedistributeDir("/data1/systemFiles/redistribute");
const string InfoFileName("/redistribute.info");
const string PlanFileName("/redistribute.plan");

RedistributeControl* RedistributeControl::instance()
{
  // The constructor is protected by instanceMutex lock.
  boost::mutex::scoped_lock lock(instanceMutex);

  if (fInstance == NULL)
    fInstance = new RedistributeControl();

  return fInstance;
}

RedistributeControl::RedistributeControl() : fInfoFilePtr(NULL), fPlanFilePtr(NULL)
{
  // default path /usr/local/mariadb/columnstore/data1/systemFiles/redistribute
  fRedistributeDir = "/var/lib/columnstore/" + RedistributeDir;
  fInfoFilePath = fRedistributeDir + InfoFileName;
  fPlanFilePath = fRedistributeDir + PlanFileName;

  fOam.reset(new oam::Oam);
  fDbrm.reset(new BRM::DBRM);
  fSysLogger.reset(new logging::Logger(32));  // 32 - writeengineserver in SubsystemIDs.txt
  logging::MsgMap msgMap;
  msgMap[logging::M0002] = logging::Message(logging::M0002);
  fSysLogger->msgMap(msgMap);

  // struct stat st;
  // if (stat(fRedistributeDir.c_str(), &st) != 0)
  // boost::filesystem::path dirPath(fRedistributeDir);
  if (boost::filesystem::exists(fRedistributeDir))
  {
    // try to open info file for update if dir exists
    RedistributeInfo info;
    fInfoFilePtr = fopen(fInfoFilePath.c_str(), "r+");

    if (fInfoFilePtr != NULL && 1 == fread(&info, sizeof(info), 1, fInfoFilePtr))
    {
      fRedistributeInfo = info;

      // if there was an active session, mark it as failed until support resume.
      if (fRedistributeInfo.state == RED_STATE_ACTIVE)
        updateState(RED_STATE_FAILED);
    }
  }
}

RedistributeControl::~RedistributeControl()
{
  fOam.reset();
  fDbrm.reset();
  delete fInstance;
  fInstance = NULL;
}

int RedistributeControl::handleUIMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
{
  boost::mutex::scoped_lock sessionLock(fSessionMutex);

  uint32_t status = RED_STATE_UNDEF;
  const RedistributeMsgHeader* h = (const RedistributeMsgHeader*)bs.buf();

  try
  {
    switch (h->messageId)
    {
      case RED_CNTL_START: status = handleStartMsg(bs, so); break;

      case RED_CNTL_STOP: status = handleStopMsg(bs, so); break;

      case RED_CNTL_CLEAR: status = handleClearMsg(bs, so); break;

      case RED_CNTL_STATUS:
      default: status = handleStatusMsg(bs, so); break;
    }
  }
  catch (const std::exception& ex)
  {
    if (fUIResponse.empty())
      fUIResponse = ex.what();
  }
  catch (...)
  {
    if (fUIResponse.empty())
      fUIResponse = "Failed to process the redistribute command.";
  }

  // log the response
  logMessage(fUIResponse);

  //	bs restart() in handlers
  bs.restart();
  bs << (ByteStream::byte)WriteEngine::WE_SVR_REDISTRIBUTE;  // dummy, keep for now.
  bs << status;
  bs << fUIResponse;
  so.write(bs);

  return status;
}

int RedistributeControl::handleStartMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
{
  ostringstream oss;
  uint32_t status = getCurrentState();

  if (status != RED_STATE_IDLE)
  {
    if (status == RED_STATE_ACTIVE)
      oss << "Redistribute is already running.  Command is ignored.  You need to stop and clear this active "
             "session before starting a new one.";
    else
      oss << "Redistribute is not in IDLE state.  Command is ignored.  Please check the status of last "
             "session, then reset the state to IDLE using action CLEAR.";

    fUIResponse = oss.str();

    return status;
  }

  // must be IDLE state
  try
  {
    // skip the header part, may need save it.
    bs.advance(sizeof(RedistributeMsgHeader));

    getStartOptions(bs);
    RedistributeControlThread::setStopAction(false);
    updateState(RED_STATE_ACTIVE);
  }
  catch (const std::exception& ex)
  {
    if (fErrorMsg.empty())
      fErrorMsg = ex.what();

    fRedistributeInfo.state = RED_STATE_FAILED;
  }
  catch (...)
  {
    fRedistributeInfo.state = RED_STATE_FAILED;
  }

  status = fRedistributeInfo.state;

  if (status == RED_STATE_ACTIVE)
  {
    oss << "Redistribute is started.";
    fControlThread.reset(new boost::thread(RedistributeControlThread(RED_CNTL_START)));
    // Let go the new thread unless we want to call interrupt on this thread in future.
    // Not going to join() because the redistribution could take very long.
    fControlThread->detach();
    fControlThread.reset();
  }
  else
  {
    updateState(RED_STATE_FAILED);
    oss << "Starting redistribute failed.";

    if (!fErrorMsg.empty())
      oss << "  " << fErrorMsg;
  }

  fUIResponse = oss.str();

  return status;
}

int RedistributeControl::handleStatusMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so)
{
  ostringstream oss;
  uint32_t status = getCurrentState();
  RedistributeInfo info = fRedistributeInfo;

  switch (status)
  {
    case RED_STATE_IDLE: oss << "Redistribute is in IDLE state."; break;

    case RED_STATE_ACTIVE:
      oss << "Redistribute is in progress: total " << info.planned;

      if (info.planned > 1)
        oss << " logical partitions are planned to move.\n";
      else
        oss << " logical partition is planned to move.\n";

      if (info.planned > 0)
      {
        if (info.endTime > 0)
          oss << "In " << (info.endTime - info.startTime) << " seconds, ";

        oss << info.success << " success, " << info.skipped << " skipped, " << info.failed << " failed, "
            << ((info.success + info.skipped + info.failed) * 100 / info.planned) << "%.";
      }

      break;

    case RED_STATE_FINISH:
      oss << "Redistribute is finished.\n"
          << info.success << " success, " << info.skipped << " skipped, " << info.failed << " failed.\n";

      if (info.endTime > 0)
        oss << "Total time: " << (info.endTime - info.startTime) << " seconds.\n";

      break;

    case RED_STATE_FAILED:
      oss << "Redistribute is failed.\n";

      try
      {
        size_t l = 0;  // message length
        size_t n = fread(&l, sizeof(int), 1, fInfoFilePtr);

        if (n == 1)
        {
          boost::scoped_array<char> buf(new char[l + 1]);
          n = fread(buf.get(), 1, l, fInfoFilePtr);

          if (n == l)
          {
            buf[l] = '\0';
            fErrorMsg += buf.get();
            oss << buf.get();
          }
        }
      }
      catch (const std::exception&)
      {
      }
      catch (...)
      {
      }

      break;

    case RED_STATE_STOPPED:
      oss << "Redistribute is stopped by user.\n";

      if (info.planned > 0)
      {
        if (info.endTime > 0)
          oss << "In " << (info.endTime - info.startTime) << " seconds, ";

        oss << info.success << " success, " << info.skipped << " skipped, " << info.failed << " failed, "
            << ((info.success + info.skipped + info.failed) * 100 / info.planned) << "%.";
      }

      break;

    default:
      oss << "Failed to retrieve redistribute information, the file " << fInfoFilePath
          << " may be corrupted.";
      break;
  }

  fUIResponse = oss.str();

  return status;
}

int RedistributeControl::handleStopMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so)
{
  ostringstream oss;
  uint32_t status = getCurrentState();

  if (status != RED_STATE_ACTIVE)
  {
    oss << "Redistribute is not running. Command is ignored.";
  }
  else
  {
    RedistributeControlThread::setStopAction(true);
    updateState(RED_STATE_STOPPED);
    status = RED_STATE_STOPPED;
    boost::thread rct((RedistributeControlThread(RED_CNTL_STOP)));
    rct.join();
    oss << "Redistribute is stopped.";
  }

  fUIResponse = oss.str();

  return status;
}

int RedistributeControl::handleClearMsg(messageqcpp::ByteStream&, messageqcpp::IOSocket& so)
{
  ostringstream oss;
  uint32_t status = getCurrentState();

  if (status == RED_STATE_ACTIVE)
  {
    oss << "Redistribute is running. Command is ignored. To CLEAR, you have to wait or stop the running "
           "session.";
  }
  else
  {
    updateState(RED_STATE_IDLE);
    status = RED_STATE_IDLE;
    oss << "Cleared.";
  }

  fUIResponse = oss.str();

  return status;
}

uint32_t RedistributeControl::getCurrentState()
{
  uint32_t status = RED_STATE_UNDEF;
  ostringstream oss;
  boost::mutex::scoped_lock lock(fInfoFileMutex);

  if (!fInfoFilePtr)
  {
    status = RED_STATE_IDLE;
  }
  else
  {
    rewind(fInfoFilePtr);
    RedistributeInfo info;
    size_t n = fread(&info, sizeof(info), 1, fInfoFilePtr);

    if (n == 1)
    {
      fRedistributeInfo = info;
      status = info.state;
    }
  }

  return status;
}

bool RedistributeControl::getStartOptions(messageqcpp::ByteStream& bs)
{
  bool ret = true;
  uint32_t n = 0;
  uint32_t d = 0;

  try
  {
    bs >> fOptions;

    bs >> n;
    fSourceList.clear();
    fSourceList.reserve(n);

    for (uint32_t i = 0; i < n; i++)
    {
      bs >> d;
      fSourceList.push_back(d);
    }

    bs >> n;
    fDestinationList.clear();
    fDestinationList.reserve(n);

    for (uint32_t i = 0; i < n; i++)
    {
      bs >> d;
      fDestinationList.push_back(d);
    }

    if (fSourceList.size() == 0 || fDestinationList.size() == 0)
      throw runtime_error("Failed to get dbroot lists.");
  }
  catch (const std::exception& ex)
  {
    ret = false;
    fErrorMsg = ex.what();
  }
  catch (...)
  {
    ret = false;
    fErrorMsg = "Failed to get dbroot lists.";
  }

  return ret;
}

void RedistributeControl::updateState(uint32_t s)
{
  boost::mutex::scoped_lock lock(fInfoFileMutex);

  // allowed state change:
  //   idle    ->  active
  //   active  ->  finish
  //   active  ->  stopped
  //   active  ->  failed
  //   finish  ->  idle
  //   stopped ->  idle
  //   failed  ->  idle

  if (s == RED_STATE_IDLE)
  {
    if (fRedistributeInfo.state == RED_STATE_ACTIVE)
      return;

    // close the files if they are already opened
    if (fInfoFilePtr != NULL)
    {
      fclose(fInfoFilePtr);
      fInfoFilePtr = NULL;
    }

    if (fPlanFilePtr != NULL)
    {
      fclose(fPlanFilePtr);
      fPlanFilePtr = NULL;
    }

    // move old files to archive
    // zip or compress if the .plan file gets large
    time_t t = fRedistributeInfo.startTime;

    if (t == 0)
      t = time(NULL);

    ostringstream oss;
    struct tm m;
    localtime_r(&t, &m);
    oss << setfill('0') << setw(4) << (m.tm_year + 1900) << setw(2) << (m.tm_mon + 1) << setw(2)
        << (m.tm_mday) << setw(2) << (m.tm_hour) << setw(2) << (m.tm_min) << setw(2) << (m.tm_sec);

    try
    {
      if (boost::filesystem::exists(fInfoFilePath) && boost::filesystem::exists(fPlanFilePath))
      {
        bool mergeOk = false;
        FILE* infoPtr = fopen(fInfoFilePath.c_str(), "r+b");
        FILE* entryPtr = fopen(fPlanFilePath.c_str(), "rb");
        int rc = 1;

        if (infoPtr != NULL && entryPtr != NULL)
        {
          rc = fseek(infoPtr, sizeof(RedistributeInfo), SEEK_SET);
          RedistributePlanEntry entry;

          while (rc == 0)
          {
            size_t n = fread(&entry, sizeof(entry), 1, entryPtr);

            if (n != 1)
              break;

            n = fwrite(&entry, sizeof(entry), 1, infoPtr);
            fflush(infoPtr);

            if (n != 1)
              rc = -1;
          }
        }

        if (rc == 0 && feof(entryPtr))
          mergeOk = true;

        if (infoPtr != NULL)
          fclose(infoPtr);

        if (entryPtr != NULL)
          fclose(entryPtr);

        if (mergeOk)
          boost::filesystem::remove(fPlanFilePath);
      }

      if (boost::filesystem::exists(fInfoFilePath))
      {
        string newInfoPath = fRedistributeDir + "/archive" + InfoFileName + "." + oss.str();
        boost::filesystem::rename(fInfoFilePath, newInfoPath);
      }

      if (boost::filesystem::exists(fPlanFilePath))
      {
        string newPlanPath = fRedistributeDir + "/archive" + PlanFileName + "." + oss.str();
        boost::filesystem::rename(fPlanFilePath, newPlanPath);
      }
    }
    catch (const std::exception&)
    {
    }
    catch (...)
    {
    }

    fRedistributeInfo = RedistributeInfo();
    return;
  }

  // safety check
  if (s != RED_STATE_ACTIVE && fRedistributeInfo.state != RED_STATE_ACTIVE)
    return;

  // in IDLE state there is no redistribute.info file
  if (s == RED_STATE_ACTIVE)
  {
    //		boost::filesystem::path dirPath(fRedistributeDir);
    //		if (boost::filesystem::exists(fRedistributeDir) &&
    //!boost::filesystem::is_directory(fRedistributeDir)) 			boost::filesystem::remove(fRedistributeDir);
    if (!boost::filesystem::exists(fRedistributeDir))
    {
      errno = 0;
      boost::filesystem::create_directory(fRedistributeDir);

      if (!boost::filesystem::exists(fRedistributeDir))
      {
        int e = errno;
        ostringstream oss;
        oss << "Failed to create redistribute directory: ";
        oss << strerror(e) << " (" << e << ")";
        throw runtime_error(oss.str());
      }

      errno = 0;
      boost::filesystem::path archivePath(fRedistributeDir + "/archive");
      boost::filesystem::create_directory(archivePath);

      if (!boost::filesystem::exists(archivePath))
      {
        int e = errno;
        ostringstream oss;
        oss << "Failed to create redistribute archive directory: ";
        oss << strerror(e) << " (" << e << ")";
        throw runtime_error(oss.str());
      }
    }

    fRedistributeInfo.startTime = time(NULL);
  }

  // open the info file to write
  errno = 0;

  if (fInfoFilePtr == NULL)
    fInfoFilePtr = fopen(fInfoFilePath.c_str(), "w+");

  if (fInfoFilePtr == NULL)
  {
    int e = errno;
    ostringstream oss;
    oss << "Failed to open " << fInfoFilePath << ": " << strerror(e) << " (" << e << ")";
    throw runtime_error(oss.str());
  }

  fRedistributeInfo.state = s;

  if (s == RED_STATE_FINISH)
    fRedistributeInfo.endTime = time(NULL);

  rewind(fInfoFilePtr);
  size_t n = fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr);

  if (n != 1)
  {
    fclose(fInfoFilePtr);
    fInfoFilePtr = NULL;

    int e = errno;
    ostringstream oss;
    oss << "Failed to write into " << fInfoFilePath << ": " << strerror(e) << " (" << e << ")";
    throw runtime_error(oss.str());
  }

  fflush(fInfoFilePtr);
}

void RedistributeControl::setEntryCount(uint32_t entryCount)
{
  boost::mutex::scoped_lock lock(fInfoFileMutex);
  fRedistributeInfo.planned = entryCount;

  rewind(fInfoFilePtr);
  fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr);
  fflush(fInfoFilePtr);
}

void RedistributeControl::updateProgressInfo(uint32_t s, time_t t)
{
  boost::mutex::scoped_lock lock(fInfoFileMutex);
  fRedistributeInfo.endTime = t;

  switch (s)
  {
    case RED_TRANS_SUCCESS: fRedistributeInfo.success++; break;

    case RED_TRANS_SKIPPED: fRedistributeInfo.skipped++; break;

    default: fRedistributeInfo.failed++; break;
  }

  rewind(fInfoFilePtr);
  fwrite(&fRedistributeInfo, sizeof(fRedistributeInfo), 1, fInfoFilePtr);
  fflush(fInfoFilePtr);
}

int RedistributeControl::handleJobMsg(messageqcpp::ByteStream& bs, messageqcpp::IOSocket& so)
{
  //	boost::mutex::scoped_lock jobLock(fJobMutex);

  uint32_t status = RED_TRANS_SUCCESS;

  try
  {
    fWorkThread.reset(new boost::thread(RedistributeWorkerThread(bs, so)));
    fWorkThread->join();
  }
  catch (const std::exception& ex)
  {
    status = RED_TRANS_FAILED;
    logMessage(ex.what());
  }
  catch (...)
  {
    status = RED_TRANS_FAILED;
  }

  return status;
}

void RedistributeControl::logMessage(const string& msg)
{
  logging::Message::Args args;
  args.add(string("RED:"));
  args.add(msg);

  fSysLogger->logMessage(logging::LOG_TYPE_INFO, logging::M0002, args, logging::LoggingID(32, 0, 0));
}

}  // namespace redistribute

